package payloadhub

import (
	"sync"
)

type PayloadNode struct {
	payload interface{}
	next    *PayloadNode
}

// PayloadHub manager
type PayloadHub struct {
	HubOptions
	watcherMap map[*PayloadWatcher]struct{}
	watcherWg  sync.WaitGroup
	running    bool

	head *PayloadNode // 当前头部指针
	cond *sync.Cond   // 有新数据放入节点时发送 Broadcast 广播通知 Watcher 处理
}

// PayloadWatcher watcher
type PayloadWatcher struct {
	WatcherOptions
	hub     *PayloadHub
	mu      sync.Mutex
	running bool

	head *PayloadNode  // 当前头部指针
	tail **PayloadNode // 当前尾部指针
	cond *sync.Cond    // 指向 PayloadHub 的 cond，在自己的 Watcher 方法中进行信号事件监听
}

// NewPayloadHub 预初始化创建好循环链表 固定大小
func NewPayloadHub(ops ...HubOption) *PayloadHub {
	opt := HubOptions{}
	opt.addOptions(ops...)
	opt = opt.withDefaults()

	hub := &PayloadHub{
		HubOptions: opt,
		watcherMap: make(map[*PayloadWatcher]struct{}),
	}

	// 创建循环链表节点
	var prev *PayloadNode
	for i := 0; i < opt.PayloadSize; i++ {
		node := &PayloadNode{}
		if hub.head == nil {
			hub.head = node
		} else {
			prev.next = node
		}
		prev = node
	}
	prev.next = hub.head // 形成环状

	hub.running = true
	hub.cond = sync.NewCond(&sync.Mutex{})

	return hub
}

// Stop 关闭所有 Watcher 并阻断 Push
func (hub *PayloadHub) Stop() {
	hub.cond.L.Lock()
	hub.running = false
	for w, _ := range hub.watcherMap {
		w.running = false
	}
	hub.cond.Broadcast()
	hub.cond.L.Unlock()

	hub.watcherWg.Wait()
}

// IsRunning 返回当前 hub 的运行状态，如果没有调用 Stop 那么返回 true
func (hub *PayloadHub) IsRunning() bool {
	hub.cond.L.Lock()
	defer hub.cond.L.Unlock()
	return hub.running
}

// Push 向环状链表放入 payload
func (hub *PayloadHub) Push(payload interface{}) bool {
	hub.cond.L.Lock()
	defer hub.cond.L.Unlock()

	if !hub.running {
		return false
	}

	hub.head.payload = payload
	hub.head = hub.head.next

	// 广播通知所有监听者有新数据
	hub.cond.Broadcast()
	return true
}

// NewPayloadHubWatcher 返回一个监听者
func (hub *PayloadHub) NewPayloadHubWatcher(ops ...WatcherOption) *PayloadWatcher {
	var opt WatcherOptions
	for _, v := range ops {
		if v == nil {
			continue
		}
		v(&opt)
	}

	w := &PayloadWatcher{
		WatcherOptions: opt,
		hub:            hub,
		cond:           hub.cond,
	}
	return w
}

func (hub *PayloadHub) WatcherLen() int {
	return len(hub.watcherMap)
}

// Watcher 当有新数据放入节点时处理, 每个 watcher 只能运行一个 Watcher 处理器
func (watcher *PayloadWatcher) Watcher(callback func(payload interface{})) {
	watcher.mu.Lock()
	if watcher.running {
		watcher.mu.Unlock()
		return
	}
	watcher.running = true
	watcher.mu.Unlock()

	watcher.hub.watcherWg.Add(1)

	watcher.hub.cond.L.Lock()
	watcher.head = watcher.hub.head
	watcher.tail = &watcher.hub.head
	watcher.hub.watcherMap[watcher] = struct{}{}
	watcher.hub.cond.L.Unlock()

	defer func() {
		watcher.hub.cond.L.Lock()
		delete(watcher.hub.watcherMap, watcher)
		watcher.hub.cond.L.Unlock()

		watcher.hub.watcherWg.Done()
	}()

	reliable := func() {
		for watcher.head != *watcher.tail {
			callback(watcher.head.payload)
			watcher.head = watcher.head.next
		}
	}
	_ = reliable

	unreliable := func() {
		for watcher.running && watcher.head != *watcher.tail {
			callback(watcher.head.payload)
			watcher.head = watcher.head.next
		}
	}
	_ = unreliable

	for watcher.running {
		watcher.cond.L.Lock()
		for watcher.running && watcher.head == *watcher.tail {
			watcher.cond.Wait()
		}

		// 处理数据
		if watcher.reliable {
			reliable()
			watcher.cond.L.Unlock()
		} else {
			watcher.cond.L.Unlock()
			unreliable()
		}
	}
}

func (watcher *PayloadWatcher) Stop() {
	watcher.mu.Lock()
	watcher.running = false
	watcher.mu.Unlock()

	watcher.hub.cond.L.Lock()
	watcher.hub.cond.Broadcast()
	watcher.hub.cond.L.Unlock()
}
